package com.parkingwang.learning.rx2;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observables.GroupedObservable;

import java.util.concurrent.TimeUnit;

public class Demo1 {

    public static void main(String[] args) {

//        testOrder();


        //创建型操作符
//            Observable.just()
//        Observable.fromIterable()
//        Observable.fromArray("hello", "world")
//        Observable.empty()
//        Observable.range(2,5)


        //变换型操作符
//        Observable.empty().map()
        //        Observable.empty().buffer(2)
//        flatMap_concatMap();
//        groupby();

        //过滤型操作符
//        Observable.empty().filter()
//        Observable.empty().distinct()
//                Observable.empty().take()
//        Observable.empty().first()
//            Observable.empty().elementAt(1,"default")


        //条件型操作符
//        Observable.empty().isEmpty()
//        Observable.empty().contains()
//        Observable.empty().all();
//        Observable.empty().any();

        startConcatMergeZip();


        try {
            Thread.sleep(20 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void startConcatMergeZip() {
        //合并操作符

        //startWith 被观察者2先与被观察者1发射订阅数据，下游订阅数据是同一数据类型
//        Observable.just("a","b")
//                .startWith(Observable.just("A","B"))
//                .subscribe(new Consumer<String>() {
//                    @Override
//                    public void accept(String s) throws Exception {
//                        System.out.println(s);
//                    }
//                });

        //concatWith 被观察者1先与被观察者2发射订阅数据，下游订阅数据是同一数据类型
        Observable.just("a", "b")
                .concatWith(Observable.just("A", "B"))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

        //concat 最多合并4个被观察者， 按顺序发射订阅,下游订阅数据是同一数据类型
        Observable.concat(Observable.just(1),
                Observable.just(2),
                Observable.just(3),
                Observable.just(4))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

        //merge 最多合并4个被观察者， 同时并发发射订阅,下游订阅数据是同一数据类型
        System.out.println("merge---------zip");
//        Observable.merge(
//                Observable.intervalRange(1, 5, 0, 2, TimeUnit.SECONDS),
//                Observable.intervalRange(6, 5, 0, 2, TimeUnit.SECONDS),
//                Observable.intervalRange(11, 5, 0, 2, TimeUnit.SECONDS),
//                Observable.intervalRange(16, 5, 0, 2, TimeUnit.SECONDS)
//        ).subscribe(new Consumer<Long>() {
//            @Override
//            public void accept(Long aLong) throws Exception {
//                System.out.println(aLong);
//            }
//        });


        //zip 最多合并9个被观察者，同时并发发射订阅,合并的数据需要匹配相应的索引，超出数量匹配不到的数据会被丢失不被合并到下游，
        // 下游订阅数据可以是不同数据类型
        Observable.zip(
                Observable.just(1, 2, 3),
                Observable.just("a", "b"),
                new BiFunction<Integer, String, String>() {
                    @Override
                    public String apply(Integer i, String s) throws Exception {
                        return new StringBuilder()
                                .append(i).append("---")
                                .append(s).append("---")
                                .toString();
                    }
                }
        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }


    private static void groupby() {
        Observable.just(100, 2000, 600, 5000)
                .groupBy(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return integer > 1000 ? "高端" : "低端";
                    }
                }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
            @Override
            public void accept(GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
//                低端分类的价格：100
//                高端分类的价格：2000
//                低端分类的价格：600
//                高端分类的价格：5000
                stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(stringIntegerGroupedObservable.getKey() + "分类的价格：" + integer);
                    }
                });
            }
        });
    }

    private static void flatMap_concatMap() {
        //flatMap() 发射转换时无序,concatMap()发射转换时有序
        Observable.just("a", "b", "c")
//                .flatMap(new Function<String, ObservableSource<String>>() {
//                    @Override
//                    public ObservableSource<String> apply(String s) throws Exception {
//                同时接收到同时转换，不管顺序
//                        System.out.println("flatMap:" + s);
//                        return Observable.just(s).delay(2, TimeUnit.SECONDS);
//                    }
//                })
                .concatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
//                        一个接一个，上一个接收转换完毕，下一个才接收转换
                        System.out.println("concatMap:" + s);
                        return Observable.just(s).delay(2, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void testOrder() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                String s = "hello";
                System.out.println("发射：" + s);//3  延迟两秒发射
                emitter.onNext(s);

                //onComplete和onError的顺序问题：
                //先发射onComplete，再发射onError 会直接报错
                //先发射onError，再发射onComplete，不会报错，回调onError，不会回调onComplete
                emitter.onError(new NullPointerException("空指针"));
                emitter.onComplete();
                System.out.println("发射完成");//5
            }
        })
                .delaySubscription(2, TimeUnit.SECONDS)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) {
                        System.out.println("doOnSubscribe");//1
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");//2
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext:" + s);//4
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");//6
                    }
                });

        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
